/**
 * Copyright (C) @2014 Webank Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.webank.framework.data.jedis.pool;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
 * Webank JedisSentinelPool父类，如果需要监听主备切换事件消息，需要继承此类实现onSwitchMasterMessage
 * @author jonyang
 *
 */
public class WeBankJedisSentinelPool extends JedisSentinelPool {

	// 记录webank自定义的MasterListener集合
	protected Set<WeBankMasterListener> webankMasterListeners = new HashSet<WeBankMasterListener>();

	// 用于做锁的处理
	private volatile HostAndPort currentHostMaster;

	// 用于并发处理的锁
	private ReentrantLock lock = new ReentrantLock();

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			final GenericObjectPoolConfig poolConfig) {
		this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
				Protocol.DEFAULT_DATABASE);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels) {
		this(masterName, sentinels, new GenericObjectPoolConfig(),
				Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			String password) {
		this(masterName, sentinels, new GenericObjectPoolConfig(),
				Protocol.DEFAULT_TIMEOUT, password);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			final GenericObjectPoolConfig poolConfig, int timeout,
			final String password) {
		this(masterName, sentinels, poolConfig, timeout, password,
				Protocol.DEFAULT_DATABASE);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			final GenericObjectPoolConfig poolConfig, final int timeout) {
		this(masterName, sentinels, poolConfig, timeout, null,
				Protocol.DEFAULT_DATABASE);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			final GenericObjectPoolConfig poolConfig, final String password) {
		this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT,
				password);
	}

	public WeBankJedisSentinelPool(String masterName, Set<String> sentinels,
			final GenericObjectPoolConfig poolConfig, int timeout,
			final String password, final int database) {
		super(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT,
				password, database);

		for (String sentinel : sentinels) {
			final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
					.split(":")));
			WeBankMasterListener weBankMasterListener = new WeBankMasterListener(
					masterName, hap.getHost(), hap.getPort());
			webankMasterListeners.add(weBankMasterListener);
			weBankMasterListener.start();
		}
	}

	private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
		String host = getMasterAddrByNameResult.get(0);
		int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

		return new HostAndPort(host, port);
	}

	public void destroy() {
		for (MasterListener m : webankMasterListeners) {
			m.shutdown();
		}
		super.destroy();
	}

	protected class WeBankMasterListener extends MasterListener {

		protected String masterName;
		protected String host;
		protected int port;
		protected long subscribeRetryWaitTimeMillis = 5000;
		protected Jedis j;
		protected AtomicBoolean running = new AtomicBoolean(false);

		protected WeBankMasterListener() {
		}

		public WeBankMasterListener(String masterName, String host, int port) {
			this.masterName = masterName;
			this.host = host;
			this.port = port;
		}

		public WeBankMasterListener(String masterName, String host, int port,
				long subscribeRetryWaitTimeMillis) {
			this(masterName, host, port);
			this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
		}

		public void run() {

			running.set(true);

			while (running.get()) {

				j = new Jedis(host, port);

				try {
					j.subscribe(new JedisPubSubAdapter() {
						@Override
						public void onMessage(String channel, String message) {
							log.fine("Sentinel " + host + ":" + port
									+ " published: " + message + ".");

							String[] switchMasterMsg = message.split(" ");

							if (switchMasterMsg.length > 3) {

								if (masterName.equals(switchMasterMsg[0])) {
									try {
										handleSwitchMasterMessage(toHostAndPort(Arrays
												.asList(switchMasterMsg[3],
														switchMasterMsg[4])));
									} catch (Exception e) {
										e.printStackTrace();
										log.severe("handleSwitchMasterMessage error:"
												+ e.getMessage());
									}
								} else {
									log.fine("Ignoring message on +switch-master for master name "
											+ switchMasterMsg[0]
											+ ", our master name is "
											+ masterName);
								}

							} else {
								log.severe("Invalid message received on Sentinel "
										+ host
										+ ":"
										+ port
										+ " on channel +switch-master: "
										+ message);
							}
						}
					}, "+switch-master");

				} catch (JedisConnectionException e) {

					if (running.get()) {
						log.severe("Lost connection to Sentinel at " + host
								+ ":" + port
								+ ". Sleeping 5000ms and retrying.");
						try {
							Thread.sleep(subscribeRetryWaitTimeMillis);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					} else {
						log.fine("Unsubscribing from Sentinel at " + host + ":"
								+ port);
					}
				}
			}
		}

		public void shutdown() {
			try {
				log.fine("Shutting down listener on " + host + ":" + port);
				running.set(false);
				// This isn't good, the Jedis object is not thread safe
				j.disconnect();
			} catch (Exception e) {
				log.severe("Caught exception while shutting down: "
						+ e.getMessage());
			}
		}
	}

	/**
	 * 处理主备切换处理，需要考虑线程安全并发的情况
	 * 
	 * @param master
	 * @throws Exception
	 */
	private void handleSwitchMasterMessage(HostAndPort master) throws Exception {
		if (!master.equals(currentHostMaster)) {
			this.currentHostMaster = master;
			try {
				lock.lock();
				onSwitchMasterMessage(master);
			} finally {
				lock.unlock();
			}
		}
	}

	/**
	 * 系统自己实现业务逻辑
	 * 
	 * @param master
	 *            redis主服务的对象
	 */
	protected void onSwitchMasterMessage(HostAndPort master) {
//		System.out.println("onSwitchMasterMessage:" + master.getHost() + ","
//				+ master.getPort());
	}

}
